package com.example.scg.route;

import com.mongodb.client.model.changestream.OperationType;
import java.util.Optional;
import javax.annotation.PostConstruct;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.mongodb.core.ChangeStreamEvent;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

/**
 * 注意 ChangeStream 单节点数据库无法开启
 *
 * @author Aaron
 */
@Lazy(false)
@Component
@ConditionalOnProperty(value = "changeStream.enabled", havingValue = "true", matchIfMissing = true)
public class RouteChangeStreamHandler {

  private final ReactiveMongoTemplate mongoTemplate;
  private final MongoRouteDefinitionRepository routeRepository;

  public RouteChangeStreamHandler(
      MongoRouteDefinitionRepository routeRepository, ReactiveMongoTemplate mongoTemplate) {
    this.routeRepository = routeRepository;
    this.mongoTemplate = mongoTemplate;
  }

  @PostConstruct
  public void run() {
    new Thread(this::startMonitor, "ChangeStream-Monitor-routes").start();
  }

  public void startMonitor() {
    Aggregation aggregation = Aggregation.newAggregation(Aggregation
        .match(Criteria.where("operationType").in("insert", "delete", "update", "replace")));

    ChangeStreamOptions options = ChangeStreamOptions.builder()
        .filter(aggregation)
        .returnFullDocumentOnUpdate()
        .build();

    String collectionName = MongoRouteDefinition.class.getAnnotation(Document.class).value();
    Flux<ChangeStreamEvent<MongoRouteDefinition>> changeStream = mongoTemplate
        .changeStream(collectionName, options, MongoRouteDefinition.class);

    changeStream
        .log()
        .doOnNext(e -> {
          if (OperationType.INSERT == e.getOperationType()
              || OperationType.UPDATE == e.getOperationType()
              || OperationType.REPLACE == e.getOperationType()) {
            Optional.ofNullable(e.getBody()).ifPresent(routeRepository::addCache);
          } else if (OperationType.DELETE == e.getOperationType()) {
            getId(e).ifPresent(routeRepository::removeCache);
          }
        }).blockLast();
  }

  private Optional<String> getId(ChangeStreamEvent<MongoRouteDefinition> e) {
    return Optional.ofNullable(e.getRaw())
        .flatMap(raw -> Optional.ofNullable(raw.getDocumentKey()))
        .flatMap(docKey -> Optional.ofNullable(docKey.getObjectId("_id")))
        .flatMap(bson -> Optional.of(bson.getValue().toHexString()));
  }
}
